/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.lvyh.lightframe.registry.zookeeper;

import com.lvyh.lightframe.common.util.CollectionUtil;
import com.lvyh.lightframe.common.constant.RpcConstants;
import com.lvyh.lightframe.common.ext.Spi;
import com.lvyh.lightframe.registry.Registry;
import org.I0Itec.zkclient.ZkClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;

/**
 * @author lvyh 2021/05/23.
 */
@Spi("zk")
public class ZookeeperRegistry implements Registry {

    private static final Logger LOGGER = LoggerFactory.getLogger(ZookeeperRegistry.class);

    private final ZkClient zkClient;

    public ZookeeperRegistry(String zkAddress) {
        zkClient = new ZkClient(zkAddress, RpcConstants.ZK_SESSION_TIMEOUT, RpcConstants.ZK_CONNECTION_TIMEOUT);
        LOGGER.info("connect zookeeper");
    }

    @Override
    public void register(String serviceName, String serviceAddress) {

        // Create registry node (persistent)
        String registryPath = RpcConstants.ZK_REGISTRY_PATH;
        if (!zkClient.exists(registryPath)) {
            zkClient.createPersistent(registryPath);
            LOGGER.info("create registry node: {}", registryPath);
        }

        // Create service node (persistent)
        String servicePath = registryPath + "/" + serviceName;
        if (!zkClient.exists(servicePath)) {
            zkClient.createPersistent(servicePath);
            LOGGER.info("create service node: {}", servicePath);
        }

        // Create address node (temporary)
        String addressPath = servicePath + "/address-";
        String addressNode = zkClient.createEphemeralSequential(addressPath, serviceAddress);
        LOGGER.info("create address node: {}", addressNode);
    }

    @Override
    public List<String> discover(String name) {
        LOGGER.info("connect zookeeper");
        try {
            // Get service node
            String servicePath = RpcConstants.ZK_REGISTRY_PATH + "/" + name;
            if (!zkClient.exists(servicePath)) {
                throw new RuntimeException(String.format("can not find any service node on path: %s", servicePath));
            }

            List<String> addressNodeList = zkClient.getChildren(servicePath);
            if (CollectionUtil.isEmpty(addressNodeList)) {
                throw new RuntimeException(String.format("can not find any address node on path: %s", servicePath));
            }

            List<String> addressList = new ArrayList<>();
            for (String addressNode : addressNodeList) {
                // Gets the value of the address node
                String addressPath = servicePath + "/" + addressNode;
                String addressData = zkClient.readData(addressPath);
                addressList.add(addressData);
            }
            return addressList;

        } finally {
            //zkClient.close();
        }
    }
}